# set up a project
import pyspark
from delta import *
= pyspark.sql.SparkSession.builder.appName("MyApp") \
builder "spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config(
= configure_spark_with_delta_pip(builder).getOrCreate() spark
Basic Tutorial
Code
# create a table (run once)
= spark.range(0, 5)
data format("delta").save("/tmp/delta-table")
data.write.# run again creates following error:
#AnalysisException: [DELTA_PATH_EXISTS] Cannot write to already existent path file
# Read file from path
= spark.read.format("delta").load("/tmp/delta-table")
df df.show()
+---+
| id|
+---+
| 2|
| 3|
| 4|
| 0|
| 1|
+---+
# upate table data
= spark.range(5, 10)
data # brute force update
format("delta").mode("overwrite").save("/tmp/delta-table") data.write.
# conidtional update
from delta.tables import *
from pyspark.sql.functions import *
= DeltaTable.forPath(spark, "/tmp/delta-table")
deltaTable
# Update every even value by adding 100 to it
deltaTable.update(= expr("id % 2 == 0"),
condition set = { "id": expr("id + 100") })
# Delete every even value
= expr("id % 2 == 0"))
deltaTable.delete(condition
# Upsert (merge) new data
= spark.range(0, 20)
newData
"oldData") \
deltaTable.alias(
.merge("newData"),
newData.alias("oldData.id = newData.id") \
set = { "id": col("newData.id") }) \
.whenMatchedUpdate(= { "id": col("newData.id") }) \
.whenNotMatchedInsert(values
.execute()
deltaTable.toDF().show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
# time travel
= spark.read.format("delta") \
df "versionAsOf", 0) \
.option("/tmp/delta-table")
.load(
df.show()
+---+
| id|
+---+
| 2|
| 3|
| 4|
| 0|
| 1|
+---+
# perform a live streaming
= spark.readStream.format("rate").load()
streamingDf
= streamingDf \
stream "value as id") \
.selectExpr(format("delta") \
.writeStream."checkpointLocation", "/tmp/checkpoint") \
.option("/tmp/delta-table") .start(
23/12/28 19:08:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/28 19:08:27 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:08:37 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:08:47 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:08:57 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:09:07 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
stream.stop()
Build-in data?
"logs.json") spark.read.json(